# -*- coding: utf-8 -*-
# 源数据库数据根据 bill_generation_date % 180 进行分表，当天数据会比较集中1-2个分表中
from datetime import timedelta
from utils.operators.spark_submit import SparkSubmitOperator

jdbcUrl = '{{ var.json.mysql_spmi.db02 }}'
username = '{{ var.json.mysql_spmi.username }}'
password = '{{ var.json.mysql_spmi.password }}'
nowdt = '{{ execution_date | cst_ds }}'
nextdt = '{{ execution_date | date_add(1) | cst_ds }}'
table = "spmi_piece_bill_exception"
env = '{{ var.value.env_sync }}'

jsonpara = """{
"reader":{
"connect":{
"url":"jdbcUrlpara",
"username":"usernamepara",
"password":"passwordpara",
"driver":"com.mysql.cj.jdbc.Driver"
},
"dbtype":"mysql",
"tableName":"yl_spmibill_5.spmi_piece_bill_exception",
"subTableList":"[0,179]",
"where":"last_update_time_sync between str_to_date('nowdt 00:00:00', '%Y-%m-%d %H:%i:%s') and str_to_date('nowdt 23:59:59', '%Y-%m-%d %H:%i:%s')",
"query":"",
"splitColumn":"last_update_time_sync",
"equalitySectioning":0,
"containsnull":0,
"fetchsize":"1024",
"threadNumber":0
},
"channel":{
"filterAbnormalCharacter":0
},
"writer":{
"dbtype":"hive",
"tableName":"tablepara",
"database":"spmi_ods",
"writeMode": "overwrite",
"partitionColumn":"dt",
"partitionValue":"nowdt"},
"settting":{
"env":"envpara"}
}""".replace("jdbcUrlpara", jdbcUrl).replace("usernamepara", username).replace("passwordpara", password). \
    replace("nowdt", nowdt).replace("nextdt", nextdt). \
    replace("tablepara", table). \
    replace("envpara", env)


spmi_ods__spmi_piece_bill_exception = SparkSubmitOperator(
    task_id='spmi_ods__spmi_piece_bill_exception',
    email=['lukunming@jtexpress.com', 'yl_etl@yl-scm.com'],
    name='spmi_ods__spmi_piece_bill_exception_{{ execution_date | date_add(1) | cst_ds }}',
    pool='spmi_piece',
    pool_slots=4,
    execution_timeout=timedelta(hours=2),
    driver_memory='4G',
    executor_memory='4G',
    executor_cores=2,
    num_executors=2,
    java_class='com.yunlu.bigdata.jobs.synchrotool.DataSynchDriver',  # spark 主类
    application='hdfs:///scheduler/jms/spark/sync/mysql/spark_sync.jar',  # spark jar 包
    application_args=[jsonpara,],
)
